﻿/**

 * Copyright (c) 2015-2016, FastDev 刘强 (fastdev@163.com) & Quincy.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 *      http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

using OF.DistributeService.Core.Common;
using OF.Notify.DataHost;
using OF.Notify.DataHost.Cluster;
using OF.Notify.DataHost.Cluster.Entity;
using OF.Notify.Entity;
using OF.Notify.Master.Zookeeper;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace OF.Notify.Master
{
    public class ConsumerTopicClusterMaster
    {
        internal byte topicEnum;
        internal byte consumerEnum;
        internal TopicClusterMaster topicClusterMaster;

        internal IdTreeNode topicConsumerProcessedMergeTree = null;
        internal MergeContext topicConsmerProcessedMergeContext = null;
        internal MergeSession topicConsumerProcessedMergeSession = null;
        internal Thread processThread = null;

        public ConsumerTopicClusterMaster(byte topicEnum, byte consumerEnum, TopicClusterMaster topicClusterMaster)
        {
            this.topicEnum = topicEnum;
            this.consumerEnum = consumerEnum;
            this.topicClusterMaster = topicClusterMaster;

            var context = ClusterContext.Get();
            var topicContext = context.GetTopicContext(topicEnum);
            int[] calArray = IdTreeNode.GetCalArray(topicContext.GetConsumerContext(consumerEnum).GetProcessedCountArray());
            this.topicConsumerProcessedMergeTree = IdTreeNode.NewRootNode(context.GetProcessedMaxValue());
            this.topicConsumerProcessedMergeSession = new MergeSession(topicContext.GetMasterStoragePath(), calArray);
            this.topicConsmerProcessedMergeContext = new MergeContext(topicConsumerProcessedMergeSession);

            this.processThread = new Thread(new ThreadStart(DoProcessCollections));
            processThread.Start();
        }

        public List<int> GetProcessedData()
        {
            int dirMaxLevel = topicConsumerProcessedMergeSession.calArray.Length - 1;
            int maxDataCollectionId = topicClusterMaster.topicClusterIdProvider.GetMaxDataCollectionId();
            var param = new IdTreeNode.GetQueryNodesParam
            {
                count = short.MaxValue,
                dirMaxLevel = dirMaxLevel,
                minValue = 0,
                maxValue = maxDataCollectionId,
                result = new List<int>()
            };
            topicConsumerProcessedMergeTree.GetExistsNodes(param);
            return param.result;
        }

        internal void DoProcessCollections()
        {
            try
            {
                var context = ClusterContext.Get();
                int startUpInterval = context.GetStartupWaitProcessInterval();
                for (int i1 = 0; i1 < startUpInterval; i1++)
                {
                    Thread.Sleep(1000);
                }

                if (this.topicClusterMaster.clusterMaster.isDisposed)
                {
                    return;
                }
                int processCountOnce = context.GetProcessCountOnce();
                int minValue = 0;
                int dirMaxLevel = topicConsumerProcessedMergeSession.calArray.Length - 1;
                while (true)
                {
                    try
                    {
                        if (topicClusterMaster.clusterMaster.isDisposed)
                        {
                            return;
                        }

                        int maxDataCollectionId = topicClusterMaster.topicClusterIdProvider.GetMaxDataCollectionId();
                        //Util.LogInfo("maxDataCollectionId is:" + maxDataCollectionId);
                        while (true)
                        {
                            if (topicClusterMaster.clusterMaster.isDisposed)
                            {
                                return;
                            }
                            var param = new IdTreeNode.GetQueryNodesParam
                            {
                                count = processCountOnce,
                                dirMaxLevel = dirMaxLevel,
                                minValue = minValue,
                                maxValue = maxDataCollectionId,
                                result = new List<int>()
                            };
                            topicConsumerProcessedMergeTree.GetNotExistsNodes(param);
                            var result = param.result;
                            var onLineClusterList = topicClusterMaster.onLineTopicClusterDict.Values.ToList();
                            if (result == null || result.Count == 0)
                            {
                                AssignOnInstantProcessCollections(onLineClusterList);
                                minValue = 0;
                                //Util.LogInfo("end process:" + DateTime.Now + ",max is:" + param.maxValue);
                                break;
                            }
                            if (topicClusterMaster.clusterMaster.isDisposed)
                            {
                                return;
                            }
                            ConcurrentDictionary<short, List<int>> idDict = null;
                            var exceptOnInstanceCollections = result.Where(clusterId => !IsCollectionOnInstantProcess(onLineClusterList, clusterId)).ToList();
                            if (exceptOnInstanceCollections != null && exceptOnInstanceCollections.Count > 0)
                            {
                                GetUnProcessedCollectionResultDTO resultDTO = MultiSendGetUnProcessedFinishedCollections(exceptOnInstanceCollections);
                                idDict = resultDTO.nodeUnProcessCollectionsDict;
                                List<int> processedCollectionList = resultDTO.processedCollectionList;
                                if (processedCollectionList != null && processedCollectionList.Count > 0)
                                {
                                    DoMergeProcessResult(context, processedCollectionList);
                                }
                            }
                            if (topicClusterMaster.clusterMaster.isDisposed)
                            {
                                return;
                            }
                            if (idDict != null && idDict.Count > 0)
                            {
                                var collectionNodeGroup = idDict.SelectMany(item => item.Value.Select(collectionId => new KeyValuePair<int, short>(collectionId, item.Key)))
                                    .GroupBy(collectionNode => collectionNode.Key);
                                Random rnd = new Random();
                                var collectionRandomeNode = collectionNodeGroup.Select(item => new KeyValuePair<int, short>(item.Key, item.OrderBy(titem => rnd.Next()).First().Value));
                                var nodeGroupCollection = collectionRandomeNode.GroupBy(item => item.Value);
                                Dictionary<short, List<int>> nodeCollectionDict = new Dictionary<short, List<int>>();
                                foreach (var nodeGroup in nodeGroupCollection)
                                {
                                    nodeCollectionDict.Add(nodeGroup.Key, nodeGroup.Select(item => item.Key).ToList());
                                }

                                if (topicClusterMaster.clusterMaster.isDisposed)
                                {
                                    return;
                                }
                                var processedCollections = MultiSendProcessCollections(nodeCollectionDict).ToList();
                                DoMergeProcessResult(context, processedCollections);
                            }
                            minValue = result[result.Count - 1] + 1;
                        }
                    }
                    catch (Exception ex)
                    {
                        Util.LogInfo(ex.ToString() + "\ntopicClusterMaster.clusterMaster.isDisposed:" + topicClusterMaster.clusterMaster.isDisposed);
                        if (topicClusterMaster.clusterMaster.isDisposed)
                        {
                            return;
                        }
                    }
                    Thread.Sleep(50);
                }
            }
            catch (Exception ex)
            {
                Util.LogException("DoProcessCollections", ex);
                if (topicClusterMaster.clusterMaster.isDisposed)
                {
                    return;
                }
                throw;
            }
        }

        private void DoMergeProcessResult(ClusterContext context, List<int> processedCollections)
        {
            bool isTreeMerged = false;
            if (processedCollections != null)
            {
                foreach (var processedCollectionId in processedCollections)
                {
                    IdTreeNode newNode = IdTreeNode.NewIdTree(topicConsumerProcessedMergeSession.calArray, processedCollectionId, context.GetProcessedMaxValue());
                    this.topicConsumerProcessedMergeTree.MergeNode(newNode, this.topicConsmerProcessedMergeContext);
                    isTreeMerged = true;
                }
            }
            if (isTreeMerged)
            {
                if (topicClusterMaster.clusterMaster.isDisposed)
                {
                    return;
                }

                MultiSendSyncProcessedTree(processedCollections);
            }
        }

        internal class GetUnProcessedCollectionResultDTO
        {
            public ConcurrentDictionary<short, List<int>> nodeUnProcessCollectionsDict;
            public List<int> processedCollectionList;
        }

        internal GetUnProcessedCollectionResultDTO MultiSendGetUnProcessedFinishedCollections(List<int> unProcessedCollections)
        {
            GetUnProcessedCollectionResultDTO result = new GetUnProcessedCollectionResultDTO();
            List<DataNodeProxy> onLineNodes = topicClusterMaster.clusterMaster.dataNodeProxyDict.Values.ToList();
            SendGetUnProcessedFinishedCollectionsRequest request = new SendGetUnProcessedFinishedCollectionsRequest
            {
                topicEnum = topicEnum,
                consumerEnum = consumerEnum, 
                unProcessedCollections = unProcessedCollections
            };
            ConcurrentDictionary<short, List<int>> nodeFinishedCollectionsDict = new ConcurrentDictionary<short, List<int>>();
            ConcurrentDictionary<int, bool> onProcessCollectionDict = new ConcurrentDictionary<int, bool>();
            ConcurrentDictionary<int, bool> processedCollectionDict = new ConcurrentDictionary<int, bool>();
            Parallel.ForEach(onLineNodes, ClusterContext.GetParallelOptions(), (node) => {
                if (topicClusterMaster.clusterMaster.isDisposed)
                {
                    return;
                }
                var callResult = node.SendGetUnProcessedFinishedCollections(request);
                if (callResult.IsSuccess())
                {
                    SendGetUnProcessedFinishedCollectionsDTO findDTO = callResult.Data;
                    List<int> finishedCollectionList = findDTO.finishedCollectionList;
                    if (finishedCollectionList != null && finishedCollectionList.Count > 0)
                    {
                        nodeFinishedCollectionsDict.TryAdd(node.GetId(), finishedCollectionList);
                    }

                    if (findDTO.processingCollectionList != null)
                    {
                        foreach (int collectionId in findDTO.processingCollectionList)
                        {
                            onProcessCollectionDict.TryAdd(collectionId, true);
                        }
                    }

                    if (findDTO.processedCollectionList != null)
                    {
                        foreach (int collectionId in findDTO.processedCollectionList)
                        {
                            processedCollectionDict.TryAdd(collectionId, true);
                        }
                    }
                }
            });

            ConcurrentDictionary<short, List<int>> nodeUnProcessCollectionsDict = new ConcurrentDictionary<short, List<int>>();
            foreach (var kv in nodeFinishedCollectionsDict)
            {
                List<int> finishedCollections = kv.Value;
                List<int> unprocessColelctions = finishedCollections.Where(finishCollectionId => !onProcessCollectionDict.ContainsKey(finishCollectionId)
                    && !processedCollectionDict.ContainsKey(finishCollectionId)).ToList();
                nodeUnProcessCollectionsDict.TryAdd(kv.Key, unprocessColelctions);
            }
            result.nodeUnProcessCollectionsDict = nodeUnProcessCollectionsDict;
            result.processedCollectionList = processedCollectionDict.Keys.ToList();
            return result;
        }

        internal ConcurrentBag<int> MultiSendProcessCollections(Dictionary<short, List<int>> nodeCollectionsDict)
        {
            ConcurrentBag<int> bag = new ConcurrentBag<int>();
            Parallel.ForEach(nodeCollectionsDict, ClusterContext.GetParallelOptions(), (nodeCollections) => {
                if (topicClusterMaster.clusterMaster.isDisposed)
                {
                    return;
                }
                DataNodeProxy node = null;
                if (!topicClusterMaster.clusterMaster.dataNodeProxyDict.TryGetValue(nodeCollections.Key, out node))
                {
                    return;
                }

                var callResult = node.SendProcessCollections(new SendProcessCollectionsRequest
                {
                    topicEnum = topicEnum,
                    consumerEnum = consumerEnum,
                    collections = nodeCollections.Value
                });
                if (!callResult.IsSuccess())
                {
                    return;
                }

                var result = callResult.Data;
                if (result != null)
                {
                    foreach (var id in result)
                    {
                        bag.Add(id);
                    }
                }
            });
            return bag;
        }

        internal bool IsCollectionOnInstantProcess(List<TopicCluster> onLineClusterList, int collectionId)
        {
            TopicCluster returnCluster = onLineClusterList.FirstOrDefault(cluster => cluster.IsCollectionOnInstantProcess(consumerEnum, collectionId));
            return returnCluster != null;
        }

        public void MergeNode(int collectionId)
        {
            var context = ClusterContext.Get();
            IdTreeNode newNode = IdTreeNode.NewIdTree(topicConsumerProcessedMergeSession.calArray, collectionId, context.GetProcessedMaxValue());
            this.topicConsumerProcessedMergeTree.MergeNode(newNode, this.topicConsmerProcessedMergeContext);
        }

        public void MergeNode(IdTreeNode nodeProcessedTree)
        {
            this.topicConsumerProcessedMergeTree.MergeNode(nodeProcessedTree, topicConsmerProcessedMergeContext);            
        }

        public void MultiSendSyncProcessedTree(List<int> toDeleteCollections)
        {
            IdTreeNode processedTreeNode = this.topicConsumerProcessedMergeTree;
            var request = new SendSyncProcessedTreeRequest { 
                topicEnum = topicEnum, consumerEnum = consumerEnum,
                processedTreeNode = processedTreeNode,
                toDeleteCollections = toDeleteCollections
            };
            Parallel.ForEach(topicClusterMaster.clusterMaster.dataNodeProxyDict.Values, ClusterContext.GetParallelOptions(), (node) => {
                node.SendSyncProcessedTree(request);
            });
        }

        internal void AssignOnInstantProcessCollections(List<TopicCluster> onLineClusterList)
        {
            if (topicClusterMaster.clusterMaster.isDisposed)
            {
                return;
            }
            int maxCount = ClusterContext.Get().GetMaxInstantProcessCollectionCount();
            int currentCount = 0;
            foreach (var cluster in onLineClusterList)
            {
                currentCount += cluster.GetOnInstanceProcessCollectionCount(consumerEnum);
            }

            if (currentCount >= maxCount)
            {
                return;
            }
            Dictionary<short, int> nodeCountDict = new Dictionary<short, int>(topicClusterMaster.clusterMaster.dataNodeProxyDict.Count);
            Func<short, int> nodeCountFunc = (nodeId) =>
            {
                if (nodeCountDict.ContainsKey(nodeId))
                {
                    return nodeCountDict[nodeId];
                }
                else
                {
                    return 0;
                }
            };

            Action<short> nodeCountIncAction = (nodeId) =>
            {
                if (nodeCountDict.ContainsKey(nodeId))
                {
                    nodeCountDict[nodeId] = nodeCountDict[nodeId] + 1;
                }
                else
                {
                    nodeCountDict[nodeId] = 1;
                }
            };
            foreach (var cluster in onLineClusterList)
            {
                int collectionId = 0;
                if (cluster.SetCurrentCollectionOnInstantProcess(consumerEnum, ref collectionId))
                {
                    short minAssignedNodeId = cluster.GetClusterNodeIds().OrderBy(nodeId => nodeCountFunc(nodeId)).First();
                    cluster.SetInstantProcessNodeId(consumerEnum, collectionId, minAssignedNodeId);
                    var sendProcessRequest = topicClusterMaster.clusterMaster.GetProxy(minAssignedNodeId).SendInstantProcessCollection(new SendInstantProcessCollectionRequest { collectionId = collectionId, topicEnum = topicEnum, consumerEnum = consumerEnum });
                    if (sendProcessRequest.IsSuccess() && sendProcessRequest.Data)
                    {
                        nodeCountIncAction(minAssignedNodeId);
                    }
                    else
                    {
                        cluster.RemoveCollectionOnInstantProcess(consumerEnum, collectionId);
                    }
                    currentCount++;
                    if (currentCount >= maxCount)
                    {
                        return;
                    }
                }
            }
        }
    }
}
